/*
 *Copyright 2014 DDPush
 *Author: AndyKwok(in English) GuoZhengzhu(in Chinese)
 *Email: ddpush@126.com
 *

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

*/
package org.ddpush.im.v1.client.appuser;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
 * UDP客户端基类
 * 
 * @see http://blog.csdn.net/brok1n/article/details/44537237
 *
 */
/**
 * @author lilei
 *
 */
public abstract class UDPClientBase implements Runnable {

	/**
	 * UDP套接字
	 */
	protected DatagramSocket ds;
	/**
	 * 最后发包时间
	 */
	protected long lastSent = 0;
	/**
	 * Socket端口
	 */
	protected int remotePort = 9966;
	/** 应用ID */
	protected int appid = 1;
	/** uuid **/
	protected byte[] uuid;
	/**
	 * 服务器地址
	 */
	protected String remoteAddress = null;
	/**
	 * 并发的 消息队列
	 */
	protected ConcurrentLinkedQueue<Message> mq = new ConcurrentLinkedQueue<Message>();

	/**
	 * 向消息队列里添加消息的计数器
	 */
	protected AtomicLong queueIn = new AtomicLong(0);
	/**
	 * 从消息队列里取出消息的计数器
	 */
	protected AtomicLong queueOut = new AtomicLong(0);

	/**
	 * 消息缓冲区长度
	 */
	protected int bufferSize = 1024;
	/**
	 * 心跳时间间隔 单位（秒）
	 * 
	 */
	protected int heartbeatInterval = 50;

	/**
	 * 存放消息数据的byte[]
	 */
	protected byte[] bufferArray;
	/**
	 * 消息数据包缓冲区
	 */
	protected ByteBuffer buffer;
	/**
	 * 是否需要重置客户端和服务器端连接标志位
	 */
	protected boolean needReset = true;

	/**
	 * 客户端 运行状态标识
	 */
	protected boolean started = false;
	/**
	 * 客户端 停止状态标识
	 */
	protected boolean stoped = false;

	/**
	 * 当前UDPClient线程
	 */
	protected Thread receiverT;
	/**
	 * 消息处理类
	 */
	protected Worker worker;
	/**
	 * 消息处理线程
	 */
	protected Thread workerT;

	/**
	 * 发送数据包的个数
	 */
	private long sentPackets;
	/**
	 * 接收数据包的个数
	 */
	private long receivedPackets;

	public UDPClientBase(byte[] uuid, int appid, String serverAddr, int serverPort)
			throws Exception {
		if (uuid == null || uuid.length != 16) {
			throw new java.lang.IllegalArgumentException(
					"uuid byte array must be not null and length of 16 bytes");
		}
		if (appid < 1 || appid > 255) {
			throw new java.lang.IllegalArgumentException("appid must be from 1 to 255");
		}
		if (serverAddr == null || serverAddr.trim().length() == 0) {
			throw new java.lang.IllegalArgumentException(
					"server address illegal: " + serverAddr);
		}

		this.uuid = uuid;
		this.appid = appid;
		this.remoteAddress = serverAddr;
		this.remotePort = serverPort;
	}

	/**
	 * 入队 将消息包添加到消息队列
	 * 
	 * @param message
	 * @return
	 */
	protected boolean enqueue(Message message) {
		boolean result = mq.add(message);
		if (result == true) {
			queueIn.addAndGet(1);
		}
		return result;
	}

	/**
	 * 出队 从消息队列取出消息包
	 * 
	 * @return
	 */
	protected Message dequeue() {
		Message m = mq.poll();
		if (m != null) {
			queueOut.addAndGet(1);
		}
		return m;
	}

	/**
	 * 初始化消息数据存放数组和缓冲区
	 */
	private synchronized void init() {
		bufferArray = new byte[bufferSize];
		buffer = ByteBuffer.wrap(bufferArray);
	}

	/**
	 * 客户端重置 如果连接被关闭，就重新连接
	 * 
	 * @throws Exception
	 */
	protected synchronized void reset() throws Exception {
		// 如果连接是正常的。就不重新连接
		if (needReset == false) {
			return;
		}
		// 销毁当前UDP连接
		if (ds != null) {
			try {
				ds.close();
			} catch (Exception e) {
			}
		}
		// 如果当前网络状态正常 就从新创建UDP连接
		if (hasNetworkConnection() == true) {
			ds = new DatagramSocket();
			ds.connect(new InetSocketAddress(remoteAddress, remotePort));
			needReset = false;
		} else {
			try {
				Thread.sleep(1000);
			} catch (Exception e) {
			}
		}
	}

	/**
	 * 启动UDP客户端
	 * 
	 * @throws Exception
	 */
	public synchronized void start() throws Exception {
		// 如果UDP客户端启动标识为true 就说明UDP客户端已经被启动了，就不需要再次启动
		if (this.started == true) {
			return;
		}
		// 初始化缓冲区以及存放数据包数据的数组
		this.init();
		// 创建当前UDPClient线程 用来发送心跳包 以及接收服务器发送的消息包
		receiverT = new Thread(this, "udp-client-receiver");
		// 设置为守护线程
		receiverT.setDaemon(true);
		synchronized (receiverT) {
			receiverT.start();
			receiverT.wait();
		}
		// 创建工作线程，用来处理接收到的消息包
		worker = new Worker();
		workerT = new Thread(worker, "udp-client-worker");
		workerT.setDaemon(true);
		synchronized (workerT) {
			workerT.start();
			workerT.wait();
		}

		this.started = true;
	}

	/**
	 * 停止UDP客户端
	 * 
	 * @throws Exception
	 */
	public void stop() throws Exception {
		// 修改UDP客户端 停止状态标志
		stoped = true;
		// 销毁UDP连接
		if (ds != null) {
			try {
				ds.close();
			} catch (Exception e) {
			}
			ds = null;
		}

		// 关闭 中断 当前UDP客户端线程
		if (receiverT != null) {
			try {
				receiverT.interrupt();
			} catch (Exception e) {
			}
		}

		// 关闭 中断 消息队列处理线程
		if (workerT != null) {
			try {
				workerT.interrupt();
			} catch (Exception e) {
			}
		}
	}

	// UDP客户端线程体
	public void run() {

		synchronized (receiverT) {
			receiverT.notifyAll();
		}
		// 如果UDP客户端没有被stop就一直运行
		while (stoped == false) {
			try {// 跟TCPClient一样。检测网络连接直到网络连接正常
				if (hasNetworkConnection() == false) {
					try {
						trySystemSleep();
						Thread.sleep(1000);
					} catch (Exception e) {
					}
					continue;
				}
				// 检测是否需要重新连接服务器
				reset();
				// 发送心跳包
				heartbeat();
				// 接收服务器发送过来的数据
				receiveData();
			} catch (java.net.SocketTimeoutException e) {

			} catch (Exception e) {
				// 出现异常了 就重置链接
				e.printStackTrace();
				this.needReset = true;
			} catch (Throwable t) {
				t.printStackTrace();
				this.needReset = true;
			} finally {

				// 如果连接出问题了。就等一会儿。
				if (needReset == true) {
					try {
						trySystemSleep();
						Thread.sleep(1000);
					} catch (Exception e) {
					}
				}
				if (mq.isEmpty() == true || hasNetworkConnection() == false) {
					try {
						trySystemSleep();
						Thread.sleep(1000);
					} catch (Exception e) {
					}
				}
			}
		}
		// 释放UDP连接
		if (ds != null) {
			try {
				ds.close();
			} catch (Exception e) {
			}
			ds = null;
		}
	}

	/**
	 * 向服务器发送心跳包 跟tcp的一样
	 * 
	 * {@link org.ddpush.im.v1.client.appuser.TCPClientBase#heartbeat()}
	 * 
	 * @throws Exception
	 */
	private void heartbeat() throws Exception {
		if (System.currentTimeMillis() - lastSent < heartbeatInterval * 1000) {
			return;
		}
		byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH];
		ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid)
				.put((byte) Message.CMD_0x00).put(uuid).putChar((char) 0);
		send(buffer);
	}

	/**
	 * 接收服务器发送的消息
	 * 
	 * @throws Exception
	 */
	private void receiveData() throws Exception {
		// 标准的UDP收包过程
		DatagramPacket dp = new DatagramPacket(bufferArray, bufferArray.length);
		ds.setSoTimeout(5 * 1000);
		ds.receive(dp);

		// 如果收到的数据包内容出错就丢西这个UDP数据包
		if (dp.getLength() <= 0 || dp.getData() == null || dp.getData().length == 0) {
			return;
		}
		// 根据收到的数据长度创建一个临时的数据存储数组
		byte[] data = new byte[dp.getLength()];
		// 将数据拷贝到临时数组 用来创建Message消息包
		System.arraycopy(dp.getData(), 0, data, 0, dp.getLength());
		// 用收到的数据创建Message消息包
		Message m = new Message(dp.getSocketAddress(), data);
		// 检测这个消息格式是否正确，格式不正确就丢弃
		if (m.checkFormat() == false) {
			return;
		}
		// 记录收到数据包个数
		this.receivedPackets++;
		// 告诉服务器 我收到了一个 xxx样子的数据包
		this.ackServer(m);
		// 如果是心跳包，直接丢弃，不用处理
		if (m.getCmd() == Message.CMD_0x00) {
			return;
		}
		// 将收到的消息包添加到消息队列中
		this.enqueue(m);
		// 唤醒消息队列处理线程，开始处理消息
		worker.wakeup();
	}

	/**
	 * 告诉服务器，我收到了一个什么样子的数据包
	 * 
	 * @param m
	 * @throws Exception
	 */
	private void ackServer(Message m) throws Exception {
		if (m.getCmd() == Message.CMD_0x10) {
			byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH];
			ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid)
					.put((byte) Message.CMD_0x10).put(uuid).putChar((char) 0);
			send(buffer);
		}
		if (m.getCmd() == Message.CMD_0x11) {
			byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH + 8];
			byte[] data = m.getData();
			ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid)
					.put((byte) Message.CMD_0x11).put(uuid).putChar((char) 8)
					.put(data, Message.SERVER_MESSAGE_MIN_LENGTH, 8);
			send(buffer);
		}
		if (m.getCmd() == Message.CMD_0x20) {
			byte[] buffer = new byte[Message.CLIENT_MESSAGE_MIN_LENGTH];
			ByteBuffer.wrap(buffer).put((byte) Message.version).put((byte) appid)
					.put((byte) Message.CMD_0x20).put(uuid).putChar((char) 0);
			send(buffer);
		}
	}

	/**
	 * 向服务器发送数据
	 * 
	 * @param data
	 * @throws Exception
	 */
	private void send(byte[] data) throws Exception {
		// 数据 以及 UDP连接常规验证
		if (data == null) {
			return;
		}
		if (ds == null) {
			return;
		}

		// 创建UDP数据包 并发送
		DatagramPacket dp = new DatagramPacket(data, data.length);
		dp.setSocketAddress(ds.getRemoteSocketAddress());
		// 记录本次发送数据包的时间
		ds.send(dp);
		lastSent = System.currentTimeMillis();
		/// 增加发送数据包数量
		this.sentPackets++;
	}

	/**
	 * 获取发送数据包个数
	 * 
	 * @return
	 * 
	 */
	public long getSentPackets() {
		return this.sentPackets;
	}

	/**
	 * 获取接收到数据包的个数
	 * 
	 * @return
	 */
	public long getReceivedPackets() {
		return this.receivedPackets;
	}

	/**
	 * 设置要连接的服务器的端口号
	 * 
	 * @param port
	 */
	public void setServerPort(int port) {
		this.remotePort = port;
	}

	/**
	 * 获取服务器端口号
	 * 
	 * @return
	 */
	public int getServerPort() {
		return this.remotePort;
	}

	/**
	 * 设置服务器地址
	 * 
	 * @param addr
	 */
	public void setServerAddress(String addr) {
		this.remoteAddress = addr;
	}

	/**
	 * 取得服务器地址
	 * 
	 * @return
	 */
	public String getServerAddress() {
		return this.remoteAddress;
	}

	/**
	 * 设置存放消息数据的数组大小
	 * 
	 * @param bytes
	 */
	public void setBufferSize(int bytes) {
		this.bufferSize = bytes;
	}

	/**
	 * 获取存放消息数据的数组大小
	 * 
	 * @return
	 */
	public int getBufferSize() {
		return this.bufferSize;
	}

	/**
	 * 获取最后心跳时间 注意是心跳时间
	 * 
	 * @return
	 */
	public long getLastHeartbeatTime() {
		return lastSent;
	}

	/**
	 * 设置心跳包发送的时间间隔
	 */
	public void setHeartbeatInterval(int second) {
		if (second <= 0) {
			return;
		}
		this.heartbeatInterval = second;
	}

	/**
	 * 获取心跳包的发送时间间隔
	 * 
	 * @return
	 */
	public int getHeartbeatInterval() {
		return this.heartbeatInterval;
	}

	/**
	 * 验证网络连接状态
	 * 
	 * @return
	 */
	public abstract boolean hasNetworkConnection();

	/**
	 * 休眠
	 */
	public abstract void trySystemSleep();

	/**
	 * 收到消息数据包的具体处理回调
	 * 
	 * @param message
	 */
	public abstract void onPushMessage(Message message);

	/**
	 * 消息队列处理线程类
	 *
	 */
	class Worker implements Runnable {
		public void run() {
			synchronized (workerT) {
				workerT.notifyAll();
			}
			// 如果UDP客户端没有被stop 就一直处理消息
			while (stoped == false) {
				try {
					// 消息队列处理
					handleEvent();
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					waitMsg();
				}
			}
		}

		private void waitMsg() {
			synchronized (this) {
				try {
					this.wait(1000);
				} catch (java.lang.InterruptedException e) {

				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}

		private void wakeup() {
			synchronized (this) {
				this.notifyAll();
			}
		}

		/**
		 * 消息队列处理
		 * 
		 * @throws Exception
		 */
		private void handleEvent() throws Exception {
			Message m = null;
			// 处理所有的消息队列中得消息包
			while (true) {
				// 取出一个消息包，如果取出的消息包为空就返回（消息队列没消息了）
				m = dequeue();
				if (m == null) {
					return;
				}
				// 如果取出的消息格式不对。就丢弃，继续处理下一个数据包
				if (m.checkFormat() == false) {
					continue;
				}

				// 消息包处理回调 real work here
				onPushMessage(m);
			}
			// finish work here, such as release wake lock
		}

	}
}
